/**
 * @project: parallel-task
 * @package: com.ngplat.paralleltask.task
 * @filename: MeshTask.java
 *
 * Copyright (c) 2018 eSunny Info. Tech Ltd. All rights reserved.
 * 
 */
package com.ngplat.paralleltask.task;

import java.util.Date;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ngplat.paralleltask.common.Cleanable;
import com.ngplat.paralleltask.common.Pair;
import com.ngplat.paralleltask.constants.TaskConsts;
import com.ngplat.paralleltask.constants.TaskState;
import com.ngplat.paralleltask.model.TaskInfo;
import com.ngplat.paralleltask.task.event.WorkerListener;

/**
 * @typename: WorkerTask
 * @brief: 任务包装器
 * @author: KI ZCQ
 * @date: 2018年3月9日 下午2:28:22
 * @version: 1.0.0
 * @since
 * 
 */
public class WorkerTask extends Pair<TaskInfo, TaskProcessor>
		implements Comparable<WorkerTask>, WorkerListener, Cleanable {

	private static final Logger logger = LoggerFactory.getLogger(WorkerTask.class);

	// 按照优先级放入队列
	private Queue<WorkerTask> childTaskQueue = new PriorityBlockingQueue<WorkerTask>();

	// 已完成任务数
	private AtomicInteger completedTaskCount = new AtomicInteger(0);
	// 失败任务数
	private AtomicInteger failedTaskCount = new AtomicInteger(0);
	// 是否已提交过(防止重复提交, 子任务多次累计完成个数, 导致任务错误执行)
	private AtomicBoolean isSubmited = new AtomicBoolean(false);

	/**
	 * 创建一个新的实例 WorkerTask.
	 */
	public WorkerTask() {
		childTaskQueue = new PriorityBlockingQueue<WorkerTask>();
	}

	/**
	 * 创建一个新的实例 WorkerTask.
	 * 
	 * @param taskInfo
	 *            任务基本信息
	 * @param processor
	 *            任务处理器
	 */
	public WorkerTask(TaskInfo taskInfo, TaskProcessor processor) {
		super(taskInfo, processor);
		childTaskQueue = new PriorityBlockingQueue<WorkerTask>();
	}

	/**
	 * @Description: 包装生成任务对象
	 * @param taskInfo
	 *            任务基本信息
	 * @param processor
	 *            任务处理器
	 * @return
	 */
	public static WorkerTask wrapTask(TaskInfo taskInfo, TaskProcessor processor) {

		if (taskInfo == null || processor == null) {
			throw new IllegalArgumentException("NullPointerException: taskInfo or processor is null.");
		}

		// 基本信息
		processor.initialize(taskInfo);
		return new WorkerTask(taskInfo, processor);
	}

	/**
	 * @Description: 添加子任务
	 * @param task
	 * @throws InterruptedException
	 */
	public void addChildTask(WorkerTask task) throws InterruptedException {
		childTaskQueue.add(task);
	}

	/**
	 * @Description: 通知所有的子节点, 当前任务完成
	 * @param taskKey
	 * @param state
	 */
	public void notifyChildTask(final TaskState state) {

		// 更新任务状态
		this.getInfo().setState(state);
		// 完成时间
		this.getInfo().setFinishTime(new Date());

		// 打印信息
		logger.info("[任务基本信息]: " + this.getInfo().toString());

		if (childTaskQueue == null || childTaskQueue.size() == 0)
			return;

		for (WorkerTask listener : childTaskQueue) {
			listener.onTaskComplete(this.getInfo().getTaskId(), state);
		}
	}

	/**
	 * @Description: 该任务是否可以执行
	 * @return
	 */
	public boolean canRun() {
		// 父任务失败或者依赖任务未完成均不满足执行条件
		return (failedTaskCount.get() == 0 
				&& completedTaskCount.get() == getInfo().getParentIds().length
				&& !isDisabled());
	}

	/**
	 * @Description: 判断任务是否已执行完成
	 * @return
	 */
	public boolean isFinished() {
		return this.getInfo().getState() == TaskState.TASK_COMPLETE;
	}

	/**
	 * @Description: 任务状态是否可用, 如果不可用, 直接通知子任务完成了
	 * @return
	 */
	public boolean isDisabled() {
		return this.getInfo().getStatus().equals(TaskConsts.DISABLE_STATUS);
	}
	
	/**
	 * @Description: 是否可执行: 严格检查
	 * @return
	 */
	public boolean isExecuted() {
		return canRun() && !isFinished();
	}

	/**
	 * @see java.lang.Comparable#compareTo(java.lang.Object)
	 */
	@Override
	public int compareTo(WorkerTask o) {
		// 按照优先级大小进行排序
		return getInfo().getPriority() >= o.getInfo().getPriority() ? 1 : -1;
	}

	/**
	 * @Description: 提交任务, 重置标志位
	 */
	public void submitTask() {
		// 提交任务
		TaskPoolManager.DEFAULT.submit(WorkerTask.this);
		// 设置提交标志位
		isSubmited.compareAndSet(false, true);
	}

	/**
	 * @see com.ngplat.paralleltask.task.event.WorkerListener#onTaskComplete(java.lang.String,
	 *      com.ngplat.paralleltask.constants.TaskState)
	 */
	@Override
	public void onTaskComplete(String taskKey, TaskState state) {
		logger.info(
				"WorkerTask【" + taskKey + "】 work finished. try to notify Task【" + this.getInfo().getTaskId() + "】.");

		// 已完成或者不可用, 则直接提交, 但不执行
		if (isDisabled() && !isSubmited.get()) {
			submitTask();
		}

		// 父任务执行失败
		if (TaskState.TASK_FAILED == state) {
			logger.error("WorkerTask【" + taskKey + "】 work failed. Please Check it's cause.");
			this.failedTaskCount.incrementAndGet();
			return;
		}

		// 防止发生非法通知
		if (TaskState.TASK_COMPLETE != state) {
			logger.error("WorkerTask【" + taskKey + "】 Notify State is invalid. Please Check it's cause. TaskInfo: {}",
					this.getInfo().toString());
			return;
		} else {
			this.completedTaskCount.incrementAndGet();
		}

		// 检查当前任务是否可以执行 - 提交
		if (canRun() && !isSubmited.get()) {
			logger.info("WorkerTask【" + this.getInfo().getTaskId() + "】 depends have been Done, Begin to doTask. ");
			// 状态未完成
			if(!isFinished()) {
				// 更新开始时间
				this.getInfo().setBeginTime(new Date());
				// 更新状态
				this.getInfo().setState(TaskState.TASK_RUNNING);	
			}
			// 提交任务
			submitTask();
		}
	}

	/**
	 * @see com.ngplat.paralleltask.common.Cleanable#cleanResource()
	 */
	@Override
	public void cleanResource() {
		// 完成任务数
		completedTaskCount = new AtomicInteger(0);
		// 失败任务数
		failedTaskCount = new AtomicInteger(0);
		// 设置提交标志位
		isSubmited.compareAndSet(true, false);
		// 任务信息初始化
		this.getInfo().cleanResource();
		// 子任务
		if (childTaskQueue != null && childTaskQueue.size() > 0) {
			for (WorkerTask task : childTaskQueue) {
				task.cleanResource();
			}
		}
	}

	/**
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {

		if (this.getInfo() == null) {
			return "WorkerTask [completedTaskCount=" + completedTaskCount + ", failedTaskCount=" + failedTaskCount
					+ ", isSubmited=" + isSubmited + "]";
		}

		return "WorkerTask [TaskInfo=" + this.getInfo().toString() + ", completedTaskCount=" + completedTaskCount
				+ ", failedTaskCount=" + failedTaskCount + ", isSubmited=" + isSubmited + "]";
	}

}
